addOutputLoc(partition: Int, status: MapStatus): Unit
ShuffleMapStage
— Intermediate Shuffle Map Stage in Job
A ShuffleMapStage (aka shuffle map stage, or simply map stage) is an intermediate stage in the execution DAG that produces data for shuffle operation. It is an input for the other following stages in the DAG of stages. That is why it is also called a shuffle dependency’s map side.
Tip
|
Read about ShuffleDependency. |
A ShuffleMapStage
may contain multiple pipelined operations, e.g. map
and filter
, before shuffle operation.
Caution
|
FIXME: Show the example and the logs + figures |
A ShuffleMapStage
can be part of many jobs — refer to the section ShuffleMapStage
sharing.
A ShuffleMapStage
is a stage with a ShuffleDependency — the shuffle that it is part of and outputLocs
and numAvailableOutputs
track how many map outputs are ready.
Note
|
ShuffleMapStage s can also be submitted independently as jobs with DAGScheduler.submitMapStage for Adaptive Query Planning / Adaptive Scheduling.
|
When executed, a ShuffleMapStage
saves map output files that can later be fetched by reduce tasks. When all map outputs are available, the ShuffleMapStage
is considered available (or ready).
Caution
|
FIXME Figure with ShuffleMapStages saving files |
The output locations (outputLocs
) of a ShuffleMapStage
are the same as used by its ShuffleDependency. Output locations can be missing, i.e. partitions have not been cached or are lost.
A ShuffleMapStage
is registered to DAGScheduler that tracks the mapping of shuffles (by their ids from SparkContext) to corresponding ShuffleMapStages that compute them, stored in shuffleToMapStage
.
A ShuffleMapStage
is created from an input ShuffleDependency and a job’s id (in DAGScheduler#newOrUsedShuffleStage
).
Caution
|
FIXME Where’s shuffleToMapStage used?
|
-
getShuffleMapStage - see Stage sharing
-
getAncestorShuffleDependencies
When there is no ShuffleMapStage
for a shuffle id (of a ShuffleDependency), one is created with the ancestor shuffle dependencies of the RDD (of a ShuffleDependency
) that are registered to MapOutputTrackerMaster.
FIXME Where is ShuffleMapStage
used?
-
newShuffleMapStage - the proper way to create shuffle map stages (with the additional setup steps)
-
getShuffleMapStage
- see Stage sharing
Caution
|
|
Name | Description |
---|---|
Tracks MapStatuses for each partition. There could be many When The size of |
|
The number of available outputs for the partitions.
|
removeOutputsOnExecutor
Method
Caution
|
FIXME |
outputLocInMapOutputTrackerFormat
Method
Caution
|
FIXME |
addActiveJob
Method
Caution
|
FIXME |
Creating ShuffleMapStage
Instance
Caution
|
FIXME |
ShuffleMapStage
initializes the internal registries and counters.
mapStageJobs
Method
Caution
|
FIXME |
shuffleDep
Property
Caution
|
FIXME |
removeActiveJob
Method
Caution
|
FIXME |
Registering MapStatus
For Partition — addOutputLoc
Method
addOutputLoc
adds the input status
to the output locations for the input partition
.
addOutputLoc
increments _numAvailableOutputs
internal counter if the input MapStatus
is the first result for the partition
.
Note
|
addOutputLoc is used when DAGScheduler creates a ShuffleMapStage for a ShuffleDependency and a ActiveJob (and MapOutputTrackerMaster tracks some output locations of the ShuffleDependency ) and when ShuffleMapTask has finished.
|
Removing MapStatus
For Partition And BlockManager — removeOutputLoc
Method
removeOutputLoc(partition: Int, bmAddress: BlockManagerId): Unit
removeOutputLoc
removes the MapStatus
for the input partition
and bmAddress
BlockManager from the output locations.
removeOutputLoc
decrements _numAvailableOutputs
internal counter if the the removed MapStatus
was the last result for the partition
.
Note
|
removeOutputLoc is exclusively used when a Task has failed with FetchFailed exception.
|
Finding Missing Partitions — findMissingPartitions
Method
findMissingPartitions(): Seq[Int]
Note
|
findMissingPartitions is a part of Stage contract that returns the partitions that are missing, i.e. are yet to be computed.
|
Internally, findMissingPartitions
uses outputLocs
internal registry to find indices with empty lists of MapStatus
.
ShuffleMapStage
Sharing
A ShuffleMapStage
can be shared across multiple jobs, if these jobs reuse the same RDDs.
When a ShuffleMapStage
is submitted to DAGScheduler to execute, getShuffleMapStage
is called.
scala> val rdd = sc.parallelize(0 to 5).map((_,1)).sortByKey() (1)
scala> rdd.count (2)
scala> rdd.count (3)
-
Shuffle at
sortByKey()
-
Submits a job with two stages with two being executed
-
Intentionally repeat the last action that submits a new job with two stages with one being shared as already-being-computed
